01. Work_Flow_Management(using_Airflow)

  • BigQuery에 데이터를 적재하는 작업을 완료했다면(Firebase를 사용하는 방법 혹은 embulk로 연동) 이제 BigQuery 데이터를 주기적으로 돌려야할 경우가 있습니다 ( 특히 Firebase와 연결했을 경우, 비용 절감 및 속도 개선을 위해 반드시 필요한 작업입니다 )
  • 예를 들어 Firebase 데이터에서 dau, wau, mau같은 주요 지표를 가공해서 지표만 따로 관리해야 할 필요성을 느낄 수 있습니다
  • 우분투 서버에서 Cron을 돌리는 방법 혹은 Task 관리 도구를 사용하는 방법 등이 있습니다

  • Cron을 돌리는 방법은 Cron의 수가 적거나, 연결된 작업이 없을 경우 간편하게 사용할 수 있습니다(또한 AWS의 람다를 이용하는 방법도!)
  • 그러나 Flow가 복잡하다면 Task 관리 도구 (oozie, luigi, airflow 등)을 사용해 관리할 수 있습니다

  • 본 문서에선 Airflow를 활용한 내용을 알려드리겠습니다

Airflow

  • Airflow는 Apache 오픈 소스에서 인큐베이터 단계에 있는 소프트웨어입니다
  • Python을 지원하기 때문에 python으로 쉽게 작성할 수 있습니다
  • Airflow 콘솔이 존재하기 때문에 쉽게 Task 관리를 할 수 있습니다
  • Airflow가 생소할 수 있기 때문에 Airflow 사용법을 먼저 익힌 후, BigQuery에 적용해보겠습니다
  • 1.8.1 version 이후부터 라이브러리 이름이 airflow에서 apache-airflow로 변경되었습니다!

1) Airflow 설치

  • python 라이브러리가 존재하기 때문에 pip로 쉽게 설치할 수 있습니다
  • airflow webserver -p 8080 에서 port를 직접 수정하실 수 있습니다 ( 참고로 8080은 zeppelin에서도 사용하고 있습니다 )
    pip3 install apache-airflow
    airflow initdb
    airflow webserver -p 8080
    localhost:8080
    #### Airflow 메인 화면

(왼쪽부터 설명드리자면)

  • 메인 화면엔 정의되어 있는 DAG들을 확인할 수 있습니다. 현재는 example들이 보이네요!
  • example을 보고싶지 않다면 airflow 폴더 안에 있는 airflow.cfg에서 load_examples = False로 지정해주면 됩니다!
  • DAG 는 Directed Acyclic Graph의 약자로 Airflow에선 workflow라고 설명하고 있습니다
  • Schedule은 예정된 스케쥴로 cron의 형태와 동일하게 표현이 가능합니다
  • Owner는 소유자를 뜻하는 것으로 airflow에서 user를 등록할 수 있습니다
  • Recent Tasks에 최근 실행된 Task들이 나타나며, 실행 완료된 것은 초록색으로 재시도는 노란색 실패는 빨간색으로 표시됩니다
  • DAG Runs에도 Recent Tasks와 같은 형태로 표시됩니다
  • Links는 단축 링크로 연결되어 있습니다. 모두 한번씩 눌러보세요!

2) Pipeline 정의

  • Airflow는 Airflow Home( default는 ~/airflow )의 dags 폴더 안에 python 파일을 넣어서 실행합니다
  • Operator라는 것을 통해 task를 정의합니다. Python, Bash, BigQuery 등등의 Operator가 있습니다
  • Operator는 링크에 API 설명이 나와있습니다!
  • 각 Operator는 unique한 task_id를 가지고 있습니다. BashOperator에선 bash_command, PythonOperator에선 python_callable, BigQueryOperator에선 bql같은 파라미터를 가지고 있습니다
  • 아래 코드를 dags 폴더 아래에 test.py로 넣어주세요

In [1]:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
    

# start_date를 현재날자보다 과거로 설정하면, backfill(과거 데이터를 채워넣는 액션)이 진행됩니다
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill', # Only celery option
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

# dag 객체 생성
dag = DAG('test', description='First DAG', 
          schedule_interval = '55 14 * * *', 
          default_args=default_args)


t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

# BashOperator를 사용
# task_id는 unique한 이름이어야 합니다
# bash_command는 bash에서 date를 입력한다는 뜻

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

# set_upstream은 t1 작업이 끝나야 t2가 진행된다는 뜻
t2.set_upstream(t1)
# t1.set_downstream(t2)와 동일한 표현입니다
# t1 >> t2 와 동일 표현
t3.set_upstream(t1)


[2017-10-04 00:37:45,020] {__init__.py:36} INFO - Using executor SequentialExecutor
  • 위 소스를 [Airflow Home]/dags/ 에 test.py로 저장해주세요!

  • DAGs는 각 Workflow를 뜻하고, Operator는 DAG 내에서 정의되는 작업 함수입니다. Operator가 DAG에서 호출되는 것이 Task입니다

  • 객체 지향 언어의 Class는 Operator, Object는 Task

3) Airflow 기본 명령어

airflow list_dags
- airflow의 dags 폴더 아래에 *.py 파일을 넣은 후, 위 명령어를 입력하면 DAGs의 리스트를 알 수 있습니다
- 여기에 나오는 dags의 이름은 코드에서 DAG 객체를 생성할 때 넣은 이름이 나타납니다

airflow list_tasks test
- test라는 dags안에 있는 tasks의 리스트를 알 수 있습니다

airflow list_tasks test --tree
- test라는 dags안에 있는 tasks를 tree 형태로 알 수 있습니다

airflow test [DAG id] [Task id] [date]
예시) airflow test test print_date 2017-10-01
- DAG의 Task 단위로 test해볼 수 있습니다

airflow scheduler
- Test를 모두 완료한 후, 스케쥴러를 실행해줍니다. DAG 코드에 정의된 스케쥴에 따라 실행해줍니다

airflow -h
- airflow 관련 help 명령어입니다

  • 메인 화면에서 DAG의 이름을 클릭하면 Graph View로 볼 수 있습니다

  • Tree View로도 볼 수 있습니다!
  • 빨간색 네모 안에 있는 초록색 칸을 클릭하면 아래와 같은 설정이 나옵니다

  • 각종 행동 및 설정을 할 수 있습니다

4) BigQueryOperator

  • BigQuery 관련 Operator는 아래와 같습니다
    • BigQueryCheckOperator : Performs checks against a SQL query that will return a single row with different values.
    • BigQueryValueCheckOperator : Performs a simple value check using SQL code.
    • BigQueryIntervalCheckOperator : Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.
    • BigQueryOperator : Executes BigQuery SQL queries in a specific BigQuery database.
    • BigQueryToBigQueryOperator : Copy a BigQuery table to another BigQuery table.
    • BigQueryToCloudStorageOperator : Transfers a BigQuery table to a Google Cloud Storage bucket
  • 여기서 BigQueryOperator를 사용해 결과를 테이블에 저장하는 DAG를 생성하겠습니다!
  • 생성하기 전에, localhost:8080에 접속해 구글 클라우드 관련 설정을 수정하겠습니다
  • Admin - Connections을 눌러주세요

  • bigquery_default라고 설정되어있는 곳에서 연필 버튼을 클릭해주세요 ( bigquery_default가 Operator에서 bigquery_conn_id입니다 )

Airflow - Bigquery Connection Setting


In [8]:
from datetime import timedelta, datetime

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

    
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 1),
    'email': ['yourmail@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': False,
    'retry_delay': timedelta(minutes=2),
}

dag = DAG('airflow_bigquery', default_args=default_args)

t1 = BigQueryOperator(
  task_id='bigquery_test',
  bql='SELECT COUNT(vendor_id) FROM [nyc-tlc:yellow.trips]',
  destination_dataset_table='temp.airflow_test1',
  bigquery_conn_id='bigquery_default', 
  delegate_to=True,
  udf_config=False,
  dag=dag,
)

def print_hello():
    return 'Hello Airflow'

t3 = PythonOperator(
    task_id='python_operator',
    python_callable = print_hello,
    dag = dag)

t4 = BigQueryOperator(
  task_id='bigquery_test2',
  bql='SELECT COUNT(vendor_id) as user FROM [nyc-tlc:yellow.trips]',
  destination_dataset_table='temp.airflow_test2',
  bigquery_conn_id='bigquery_default', 
  delegate_to=True,
  udf_config=False,
  dag=dag,
)

t1 >> t3 >> t4


Out[8]:
<Task(BigQueryOperator): bigquery_test2>
  • 위 파일을 dags 폴더안에 넣고, test를 거친 후 airflow scheduler 명령어를 입력해주면! temp 데이터셋에(없다면 생성해주세요!) airflow_test1, airflow_test2 테이블이 생성되어 있을거에요!

Executor

  • Airflow의 Executor는 3가지가 존재합니다
  • SequentialExecutor(default), LocalExecutor, CeleryExecutor
  • 아래 사진의 각각의 Executor별 설명을 잘 담고 있습니다!

사진 출처 동영상